Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#20970] Fix gRPC leak by closing ResidualSource at BoundedToUnboundedSourceAdapter.Reader#init() in Dataflow worker #28548

Merged
merged 3 commits into from
Oct 11, 2023

Conversation

baeminbo
Copy link
Contributor

@baeminbo baeminbo commented Sep 20, 2023

This is another case of #20970 (harmless error logs with gRPC leak).

BigQueryIO.read() with Method.DIRECT_READ in Dataflow streaming jobs can cause Dataflow to output the gRPC leak error logs like below.

This is because getCheckpointMark() in BoundedToUnboundedSourceAdapter calls init() where residualSource field is updated, but it missed closing old residualSource, which caused a leak of ResidualSource#reader closing. The reader in the stacktrace below is BigQueryStorageStreamReader. Not closing BigQueryStorageStreamReader causes the gRPC leak error logs.

Error mesasge:

*~*~*~ Previous channel ManagedChannelImpl{logId=9, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.

Stacktrace:

java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:631)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:391)
	at com.google.api.gax.grpc.ChannelPool.<init>(ChannelPool.java:107)
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:85)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:237)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:231)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:236)
	at com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub.create(EnhancedBigQueryReadStub.java:99)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.<init>(BigQueryReadClient.java:130)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.create(BigQueryReadClient.java:110)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl.<init>(BigQueryServicesImpl.java:1667)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl.<init>(BigQueryServicesImpl.java:1598)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getStorageClient(BigQueryServicesImpl.java:201)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource$BigQueryStorageStreamReader.<init>(BigQueryStorageStreamSource.java:194)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource$BigQueryStorageStreamReader.<init>(BigQueryStorageStreamSource.java:147)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource.createReader(BigQueryStorageStreamSource.java:138)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource.createReader(BigQueryStorageStreamSource.java:56)
	at org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:473)
	at org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452)
	at org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304)
	at org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297)
	at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
	at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@baeminbo baeminbo force-pushed the grpc-leak branch 2 times, most recently from 01b3774 to 204c697 Compare September 22, 2023 06:50
@github-actions github-actions bot added the core label Sep 22, 2023
@baeminbo baeminbo changed the title [#20970] Fix gRPC leak by closing reader at WorkerCustomSources.UnboundedReaderIterator in Dataflow worker [#20970] Fix gRPC leak by closing ResidualSource at BoundedToUnboundedSourceAdapter.Reader#init() in Dataflow worker Sep 22, 2023
@baeminbo
Copy link
Contributor Author

I put a comment not owned to the reader field in the class UnboundedReaderIterator in WorkerCustomSources.java. My first fix was simply to close the reader in UnboundedReaderIterator#close(). But, it had a bug to read the same data again from a reader in integration tests. To avoid this mistake again, I put the comment and additional check at ResidualSource#getCheckpointMark()

@baeminbo
Copy link
Contributor Author

retest this please.

@liferoad
Copy link
Collaborator

Run Java_GCP_IO_Direct PreCommit

@liferoad
Copy link
Collaborator

R: @kennknowles

@liferoad
Copy link
Collaborator

R: @Abacn

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@@ -288,6 +288,15 @@ private void init(
residualElementsList == null
? new ResidualElements(Collections.emptyList())
: new ResidualElements(residualElementsList);

if (this.residualSource != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking - this means that we are calling init() again without ever calling close() on the reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's at Reader.getCheckpointMark().

The getCheckpointMark() is called at bundle finish in Dataflow streaming jobs, and close() will be pending as readers are cached. A reader can be reused in next bundles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yea based on that the change is obviously correct. Thanks!

@kennknowles
Copy link
Member

I can see that testing a fix for a memory leak may be challenging, but can we do anything to test this change?

Copy link
Contributor Author

@baeminbo baeminbo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to make a test to confirm the effect of this change :) I'll ping when it's ready.

@@ -288,6 +288,15 @@ private void init(
residualElementsList == null
? new ResidualElements(Collections.emptyList())
: new ResidualElements(residualElementsList);

if (this.residualSource != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's at Reader.getCheckpointMark().

The getCheckpointMark() is called at bundle finish in Dataflow streaming jobs, and close() will be pending as readers are cached. A reader can be reused in next bundles.

@@ -288,6 +288,15 @@ private void init(
residualElementsList == null
? new ResidualElements(Collections.emptyList())
: new ResidualElements(residualElementsList);

if (this.residualSource != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yea based on that the change is obviously correct. Thanks!

@@ -505,6 +514,7 @@ BoundedSource<T> getSource() {
}

Checkpoint<T> getCheckpointMark() {
checkArgument(!closed, "getCheckpointMark() call on closed %s", getClass().getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be checkState since there are no arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -776,7 +776,7 @@ public double getRemainingParallelism() {

private static class UnboundedReaderIterator<T>
extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
private final UnboundedSource.UnboundedReader<T> reader;
private final UnboundedSource.UnboundedReader<T> reader; // not owned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does ownership mean here? Not to do with memory management obviously. If I understand correctly, it means that there may be other calls to various state methods so you can never assume what state the reader will be in? (it would be worth writing the whole long comment about what this means here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added a comment to explain it in detail, rather than a short ambiguous comment.

@baeminbo
Copy link
Contributor Author

@kennknowles I added a unit test UnboundedReadFromBoundedSourceTest#testReadersClosedProperly to check reader closing leak. Could you review the change? Thank you.

@kennknowles
Copy link
Member

I feel like I want to run a bit of a variety of integration tests but I'm not 100% sure which ones.

@kennknowles
Copy link
Member

Run Java_IOs_Direct PreCommit

@kennknowles
Copy link
Member

OK the failure we see has been fixed at HEAD. Can you rebase against the master branch and we can re-run to get green.

@baeminbo
Copy link
Contributor Author

baeminbo commented Oct 6, 2023

Run Java_GCP_IO_Direct PreCommit

@baeminbo
Copy link
Contributor Author

Run Java PreCommit

@baeminbo
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@baeminbo
Copy link
Contributor Author

Run Java PreCommit

@baeminbo
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@baeminbo
Copy link
Contributor Author

@kennknowles Some tests were flaky. But, finally, it passed all the checks.

@kennknowles kennknowles merged commit aedfa46 into apache:master Oct 11, 2023
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants